{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "41ea5a6d-f7cf-4d60-a093-4947b9b7f9b8",
   "metadata": {},
   "outputs": [],
   "source": [
    "import logging\n",
    "import sys\n",
    "import os\n",
    "from datetime import datetime\n",
    "from decimal import Decimal\n",
    "from IPython.display import display, HTML\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DecimalType, LongType\n",
    "\n",
    "\n",
    "# Logging configuration\n",
    "formatter = logging.Formatter('[%(asctime)s] %(levelname)s @ line %(lineno)d: %(message)s')\n",
    "handler = logging.StreamHandler(sys.stdout)\n",
    "handler.setLevel(logging.INFO)\n",
    "handler.setFormatter(formatter)\n",
    "logger = logging.getLogger()\n",
    "logger.setLevel(logging.INFO)\n",
    "logger.addHandler(handler)\n",
    "\n",
    "\n",
    "# Application-specific variables\n",
    "dt_string = datetime.now().strftime(\"%Y_%m_%d_%H_%M_%S\")\n",
    "AppName = \"Demo\"\n",
    "# AWS specific variables\n",
    "region = os.environ.get('AWS_REGION', 'us-east-1')\n",
    "\n",
    "# Replace S3_BUCKET and ACCOUNT_NUMBER with your own values\n",
    "input_csv_path = \"s3a://<S3_BUCKET>/s3table-example/input/\"\n",
    "# Ensure this table bucket exists\n",
    "s3table_arn = f\"arn:aws:s3tables:{region}:<ACCOUNT_NUMBER>:bucket/doeks-spark-s3-tables\"\n",
    "namespace = \"doeks_namespace\"\n",
    "table_name = \"employee_s3_table\"\n",
    "full_table_name = f\"s3tablesbucket.{namespace}.{table_name}\"\n",
    "\n",
    "spark = (SparkSession\n",
    "    .builder\n",
    "    .appName(f\"{AppName}_{dt_string}\")\n",
    "    .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\")\n",
    "    .config(\"spark.sql.catalog.s3tablesbucket\", \"org.apache.iceberg.spark.SparkCatalog\")\n",
    "    .config(\"spark.sql.catalog.s3tablesbucket.type\", \"rest\")\n",
    "    .config(\"spark.sql.catalog.s3tablesbucket.warehouse\", s3table_arn)\n",
    "    .config(\"spark.sql.catalog.s3tablesbucket.uri\", f\"https://s3tables.{region}.amazonaws.com/iceberg\")\n",
    "    .config(\"spark.sql.catalog.s3tablesbucket.rest.sigv4-enabled\", \"true\")\n",
    "    .config(\"spark.sql.catalog.s3tablesbucket.rest.signing-name\", \"s3tables\")\n",
    "    .config(\"spark.sql.catalog.s3tablesbucket.rest.signing-region\", region)\n",
    "    .config(\"spark.sql.catalog.s3tablesbucket.io-impl\", \"org.apache.iceberg.aws.s3.S3FileIO\")\n",
    "    .config('spark.hadoop.fs.s3.impl', \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n",
    "    .config(\"spark.sql.defaultCatalog\", \"s3tablesbucket\")\n",
    "    .config(\"spark.hadoop.fs.s3a.connection.timeout\", \"1200000\") \\\n",
    "    .config(\"spark.hadoop.fs.s3a.path.style.access\", \"true\") \\\n",
    "    .config(\"spark.hadoop.fs.s3a.connection.maximum\", \"200\") \\\n",
    "    .config(\"spark.hadoop.fs.s3a.fast.upload\", \"true\") \\\n",
    "    .config(\"spark.hadoop.fs.s3a.readahead.range\", \"256K\") \\\n",
    "    .config(\"spark.hadoop.fs.s3a.input.fadvise\", \"random\") \\\n",
    "    .config(\"spark.hadoop.fs.s3a.aws.credentials.provider.mapping\", \"com.amazonaws.auth.WebIdentityTokenCredentialsProvider=software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider\") \\\n",
    "    .config(\"spark.hadoop.fs.s3a.aws.credentials.provider\", \"software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider\")\n",
    "    .getOrCreate())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "baca6897-37d8-4374-b632-caac952cef07",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[]"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Create namespace if not exists\n",
    "spark.sql(f\"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "e2c2d161-27d0-4519-af19-853e1cdded8e",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- id: integer (nullable = true)\n",
      " |-- name: string (nullable = true)\n",
      " |-- level: string (nullable = true)\n",
      " |-- salary: double (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Read input CSV data\n",
    "employee_df = spark.read.csv(input_csv_path, header=True, inferSchema=True)\n",
    "employee_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "d2ed084f-4ae5-45aa-91d7-a79f9254954a",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Create a table\n",
    "spark.sql(f\"\"\"\n",
    "    CREATE TABLE IF NOT EXISTS {full_table_name} (\n",
    "        id INT,\n",
    "        name STRING,\n",
    "        level STRING,\n",
    "        salary DOUBLE\n",
    "    )\n",
    "    USING iceberg\n",
    "    OPTIONS ('format-version'='2')\n",
    "\"\"\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "9bbfb7b8-0022-40cd-aab3-ce9cb8d65a0f",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Write to the table\n",
    "employee_df.writeTo(full_table_name).using('iceberg').append()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "e8631bea-26f5-4326-8d7e-e385350a98a5",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Read back from S3 Tables\n",
    "iceberg_data_df = spark.read.format(\"iceberg\").load(full_table_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "a04dac5b-6d41-470e-87f3-e888ff8e3958",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-----------+------+--------+\n",
      "|id |name       |level |salary  |\n",
      "+---+-----------+------+--------+\n",
      "|1  |Employee_1 |Senior|197000.0|\n",
      "|2  |Employee_2 |Mid   |117500.0|\n",
      "|3  |Employee_3 |Senior|54500.0 |\n",
      "|4  |Employee_4 |Mid   |110000.0|\n",
      "|5  |Employee_5 |Junior|59000.0 |\n",
      "|6  |Employee_6 |Mid   |165500.0|\n",
      "|7  |Employee_7 |Senior|137000.0|\n",
      "|8  |Employee_8 |Junior|71000.0 |\n",
      "|9  |Employee_9 |Exec  |140000.0|\n",
      "|10 |Employee_10|Senior|129500.0|\n",
      "+---+-----------+------+--------+\n",
      "only showing top 10 rows\n",
      "\n",
      "DataFrame count: 700\n",
      "+--------------------+-------------------+-------------------+-------------------+\n",
      "|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|\n",
      "+--------------------+-------------------+-------------------+-------------------+\n",
      "|2025-04-14 17:04:...|1247367129977964401|               NULL|               true|\n",
      "|2025-04-14 17:04:...|2293528557878885471|1247367129977964401|               true|\n",
      "|2025-04-14 17:29:...|8156129775284340201|2293528557878885471|               true|\n",
      "|2025-04-14 17:39:...|7353788801766500304|8156129775284340201|               true|\n",
      "|2025-04-14 17:41:...|8234500388044211909|7353788801766500304|               true|\n",
      "|2025-04-14 17:47:...|5976269847974860656|8234500388044211909|               true|\n",
      "|2025-04-14 18:30:...|2240011576343306175|5976269847974860656|               true|\n",
      "+--------------------+-------------------+-------------------+-------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# List the table snapshots\n",
    "iceberg_data_df.show(10, truncate=False)\n",
    "print(f\"DataFrame count: {iceberg_data_df.count()}\")\n",
    "spark.sql(f\"SELECT * FROM {full_table_name}.history LIMIT 10\").show()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
